{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "e4d5aaee",
   "metadata": {},
   "source": [
    "## Creating Testing for a Simple Pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "19cf8911",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pytest\n",
    "\n",
    "def extract(filepath):\n",
    "    \"\"\"\n",
    "    Extraction function that reads an integer value from a file.\n",
    "    \"\"\"\n",
    "    with open(filepath, 'r') as file:\n",
    "        input_data = {'value': int(file.read().strip())}\n",
    "    return input_data\n",
    "\n",
    "\n",
    "def transform(input_data): \n",
    "    \"\"\"\n",
    "    Transformation function to double the input value.\n",
    "    Assumes input_data is a dictionary with 'value' key.\n",
    "    Now also ensures that values are positive.\n",
    "    \"\"\"\n",
    "\n",
    "    value = input_data['value']\n",
    "\n",
    "    if value < 0:\n",
    "        raise ValueError('Value must be positive.')\n",
    "\n",
    "    output_data = {'value': input_data['value'] * 2}\n",
    "    return output_data\n",
    "\n",
    "\n",
    "def load(output_data, database):\n",
    "    \"\"\"\n",
    "    Loading function that moves the transformed data into a database.\n",
    "    Assumes database is a dictionary and output_data contains 'value' key.\n",
    "    \"\"\"\n",
    "    database['value'] = output_data['value']"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "79d57e0c",
   "metadata": {},
   "source": [
    "### Unit Testing"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fd1d9b2d",
   "metadata": {},
   "outputs": [],
   "source": [
    "def test_transform_unittest():\n",
    "    \n",
    "    # define input and expected output data formats\n",
    "    \n",
    "    # -- < your code here > --\n",
    "    \n",
    "    # exercise\n",
    "    \n",
    "     # -- < your code here > --\n",
    "\n",
    "    # verify\n",
    "    \n",
    "     assert result # -- < your code here > --"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d58c0336",
   "metadata": {},
   "source": [
    "### Validation Testing"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7cd7d890",
   "metadata": {},
   "outputs": [],
   "source": [
    "def test_transform_validation():\n",
    "\n",
    "    # define input data format\n",
    "    \n",
    "    # -- < your code here > --\n",
    "\n",
    "    # define data condition for expectations\n",
    "    with pytest.raises(ValueError) as excinfo:\n",
    "        transform(input_data)\n",
    "        \n",
    "    assert # -- < your code here > --"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "20eedb9f",
   "metadata": {},
   "source": [
    "### Integration Testing "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b046334c",
   "metadata": {},
   "outputs": [],
   "source": [
    "def test_load_transform_integration():\n",
    "\n",
    "    # define input and expected output data formats\n",
    "    # -- < your code here > --\n",
    "\n",
    "    # add transform and load\n",
    "    transformed_data = transform(input_data)\n",
    "    load(transformed_data, database)\n",
    "\n",
    "    # verify\n",
    "    assert database # -- < your code here > --"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1053da44",
   "metadata": {},
   "source": [
    "### End-to-End Testing"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e8bc3e7f",
   "metadata": {},
   "outputs": [],
   "source": [
    "def test_pipeline_end_to_end():\n",
    "\n",
    "    # define input and expected output data formats\n",
    "    \n",
    "    # -- < your code here > --\n",
    "\n",
    "    # add open file to the test with value 10\n",
    "    with open(test_input_file, 'w') as file:\n",
    "        file.write('10')\n",
    "\n",
    "    # add extract, transform, load\n",
    "    # -- < your code here > --\n",
    "\n",
    "    # verify\n",
    "    assert database # -- < your code here > --"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "90ca6501",
   "metadata": {},
   "source": [
    "### Performance Testing"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0c330a5c",
   "metadata": {},
   "outputs": [],
   "source": [
    "import time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a4a5f287",
   "metadata": {},
   "outputs": [],
   "source": [
    "def test_transform_performance():\n",
    "\n",
    "    # define input data formats\n",
    "    # -- < your code here > --\n",
    "\n",
    "    # define start time\n",
    "    start_time = time.time()\n",
    "\n",
    "    # integrate through the transform step\n",
    "    for data in input_data:\n",
    "        transform(data)\n",
    "\n",
    "    # define the endtime\n",
    "    end_time = time.time()\n",
    "    elapsed_time = end_time - start_time\n",
    "    print(f\"Elapsed time for processing 1 million data points was {elapsed_time} seconds.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "98ac3b9d",
   "metadata": {},
   "source": [
    "### Resilence Testing"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "75db8c19",
   "metadata": {},
   "outputs": [],
   "source": [
    "import random"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e17ceffd",
   "metadata": {},
   "outputs": [],
   "source": [
    "def test_transform_resilience_timeout(input_data):\n",
    "    \"\"\"\n",
    "    Transformation function to double the input value.\n",
    "    Assumes input_data is a dictionary with 'value' key\n",
    "    \"\"\"\n",
    "\n",
    "    # includes a random chance to raise a TimeoutError\n",
    "    if random.random() < 0.1:  # 10% chance to raise an error\n",
    "        raise TimeoutError('Temporary network outage.')\n",
    "    \n",
    "    # -- < your code here > --\n",
    "    \n",
    "    return # -- < your code here > --"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0fa7a8d9",
   "metadata": {},
   "outputs": [],
   "source": [
    "def test_transform_resilience_timeout_retry5():\n",
    "    \n",
    "    # Setup\n",
    "    \n",
    "    # -- < your code here > --\n",
    "    \n",
    "    # Exercise\n",
    "    for i in range(5):\n",
    "        try:\n",
    "            result = transform(input_data)\n",
    "            break\n",
    "        except TimeoutError:\n",
    "            if i == 4:  # We've reached our maximum attempts\n",
    "                raise  # Re-raise the last exception\n",
    "    else:\n",
    "        raise ValueError(\"Transform function failed after 5 attempts.\")\n",
    "        \n",
    "    # Verify\n",
    "    assert # -- < your code here > --"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2173eb8a",
   "metadata": {},
   "source": [
    "## Monitoring ETL Pipelines"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "397767ec",
   "metadata": {},
   "outputs": [],
   "source": [
    "import time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f5da76f4",
   "metadata": {},
   "outputs": [],
   "source": [
    "def extract_data():\n",
    "    # -- < your code here > --\n",
    "    time.sleep(random.randint(1, 3))\n",
    "    return [1, 2, 3, 4, 5]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "786ab7ff",
   "metadata": {},
   "outputs": [],
   "source": [
    "def transform_data(data):\n",
    "    # -- < your code here > --\n",
    "    time.sleep(random.randint(1, 3))\n",
    "    return [x * 10 for x in data]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4157e59a",
   "metadata": {},
   "outputs": [],
   "source": [
    "def load_data(transformed_data):\n",
    "    # -- < your code here > --\n",
    "    time.sleep(random.randint(1, 3))\n",
    "    print(\"Data loaded successfully.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ac94be0c",
   "metadata": {},
   "outputs": [],
   "source": [
    "def monitor_etl_pipeline():\n",
    "    print(\"Monitoring ETL Pipeline...\")\n",
    "    try:\n",
    "        # Extract data\n",
    "        print(\"Extracting data...\")\n",
    "        extracted_data = extract_data()\n",
    "        print(\"Data extracted successfully.\")\n",
    "\n",
    "        # Transform data\n",
    "        print(\"Transforming data...\")\n",
    "        transformed_data = transform_data(extracted_data)\n",
    "        print(\"Data transformed successfully.\")\n",
    "\n",
    "        # Load data\n",
    "        print(\"Loading data...\")\n",
    "        load_data(transformed_data)\n",
    "\n",
    "        # Pipeline executed successfully\n",
    "        print(\"ETL Pipeline completed successfully.\")\n",
    "        \n",
    "    except Exception as e:\n",
    "        # Error occurred during pipeline execution\n",
    "        print(f\"ETL Pipeline failed. Error: {e}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "014762d1",
   "metadata": {},
   "outputs": [],
   "source": [
    "monitor_etl_pipeline()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
